Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP-49: Support read ahead async #3111

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

wuzhanpeng
Copy link

Master issue: #3085

@dlg99 dlg99 requested review from merlimat, sijie and eolivelli March 15, 2022 23:40
@wuzhanpeng
Copy link
Author

Hi @merlimat @sijie @eolivelli @dlg99 Could you help review this PR ?

// Async mode
entry = readAheadManager.readEntryOrWait(ledgerId, entryId);
} else {
// Sync mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you encapsulate the original sync mode implementation into one meaningful method?


dbLedgerStorageStats.getReadCacheMissCounter().inc();
dbLedgerStorageStats.getReadCacheMissCounter().inc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant spaces prefixing the line, please correct the style, same for following lines.

private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false;
private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8;

private static final class LedgerEntryPosition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it more suitable if split the class LedgerEntryPosition to separated file, since it seems like one common definition, it maybe could be used in other places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class only used in the ReadAheadManager, it's better to keep it here.

@ArvinDevel
Copy link
Contributor

@wuzhanpeng left some comments
another things are that
1)could you adjust all the public methods in the preceding part of ReadAheadManager, then following the private/protected methods, so that it could be more easier to get the functionality of the class
2)could you add UT for the ReadAheadManager

private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false;
private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8;

private static final class LedgerEntryPosition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class only used in the ReadAheadManager, it's better to keep it here.

}

@Override
public boolean equals(Object o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use @DaTa to cover equals and hashCode

}

@Override
public boolean equals(Object o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use @DaTa to cover equals and hashCode

} catch (InterruptedException e) {
log.warn("Failed to read entries={} ahead from L{} E{} due to timeout={}ms",
readAheadEntries, ledgerId, startEntryId, readAheadTimeoutMs);
readCompleted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When condition await throws InterruptedException, and runs into readCompleted, it will cause deadlock

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This place will not cause deadlock when meeting an interruption. The lock associated with this Condition is atomically released when calling await. See https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html

} catch (InterruptedException e) {
log.warn("Failed to read entries={} ahead from L{} E{} due to timeout={}ms",
readAheadEntries, ledgerId, startEntryId, readAheadTimeoutMs);
readCompleted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When condition await throws InterruptedException, and runs into readCompleted, it will cause deadlock

* @return
* @throws IOException
*/
public ByteBuf readEntryOrWait(long ledgerId, long entryId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify the function name

public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";

private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000;
private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do to decide the max default read ahead bytes ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_READ_AHEAD_BYTES and DEFAULT_READ_AHEAD_MESSAGES represent the maximum number of bytes and messages that can be read in a single read-ahead task respectively. If any of the conditions are met, the current read-ahead task will end.

public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";

private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000;
private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do to decide the max default read ahead bytes ?

public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately";
public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";

private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better decrease the default max read ahead entries number.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our practice, it is more cost-effective to read more consecutive entries when the entry size is small. In addition, because the termination condition of the read-ahead task depends on the maximum number of bytes and the maximum number of entries, under the default limit of reading up to 256KB, generally speaking, the number of entries in read-ahead task will not be too many.

public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately";
public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";

private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better decrease the default max read ahead entries number.

* A structure that records the transitions of the task status.
*/
public static class ReadAheadTaskStatus {
private long ledgerId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It better to set it as final

@wuzhanpeng
Copy link
Author

I have explained and updated the code according to the comments, you can take a look again~ @hangc0276 @ArvinDevel

Copy link
Contributor

@ArvinDevel ArvinDevel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments, I would suggest you to split this PR to several dedicated PR

if (enableReadAheadAsync) {
return readEntryUnderAsyncReadAhead(ledgerId, entryId);
} else {
return readEntryUnderSyncReadAhead(ledgerId, entryId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuzhanpeng could you divide this PR into several little PRs?, so that it's more easy to review.
1)first PR focusing on the reorganize of current sync mode read ahead : such as move fillReadAheadCache and related method to the ReadAheadManager
this PR doesn't introduce the new feature, just prepare for the new feature, and this PR should has no impact to the system;
2)following PR focusing on the functionality, 1 or more PR needed, for example core functionality, monitoring could be split into different PR.

* @return
* @throws IOException
*/
private ByteBuf readEntryUnderAsyncReadAhead(long ledgerId, long entryId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method name is a little redundant. just use readEntryAsync?

* @return
* @throws IOException
*/
private ByteBuf readEntryUnderSyncReadAhead(long ledgerId, long entryId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use readEntrySync or similar name?

Copy link
Member

@StevenLuMT StevenLuMT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's nice
but this branch has conflicts that must be resolved, @wuzhanpeng please fix it

@hangc0276 hangc0276 modified the milestones: 4.16.0, 4.17.0 Jul 29, 2022
@StevenLuMT
Copy link
Member

fix old workflow,please see #3455 for detail

@wuzhanpeng wuzhanpeng force-pushed the bp49_read_ahead_async branch from 6cd125a to b25b0f8 Compare October 19, 2022 08:24
@wuzhanpeng
Copy link
Author

The conflicts have been resolved. Please check again~

break;
}
} else {
entry = entryLogger.readEntry(ledgerId, entryId, location);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In accordance with the previous logic, we'd better use
    entry = entryLogger.internalReadEntry(ledgerId, entryId, location, false); don't do entry validation here.

@eolivelli eolivelli removed this from the 4.17.0 milestone May 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants